/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.iotdb.db.sync.externalpipe;

import org.apache.commons.lang3.Validate;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.sync.datasource.PipeOpManager;
import org.apache.iotdb.db.sync.datasource.PipeStorageGroupInfo;
import org.apache.iotdb.db.sync.externalpipe.operation.DeleteOperation;
import org.apache.iotdb.db.sync.externalpipe.operation.InsertOperation;
import org.apache.iotdb.db.sync.externalpipe.operation.Operation;
import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
import org.apache.iotdb.pipe.external.api.ExternalPipeSinkWriterStatus;
import org.apache.iotdb.pipe.external.api.IExternalPipeSinkWriter;
import org.apache.iotdb.pipe.external.api.IExternalPipeSinkWriterFactory;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static java.lang.Math.abs;

/**
 * This class handles 1 external plugin's work, including multiple working threads. All
 * StorageGroups will be distributed to different thread according to hash value. Every working
 * thread is responsible for put several StorageGroups' data into external sink.
 */
public class ExtPipePlugin {
    private static final Logger logger = LoggerFactory.getLogger(ExtPipePlugin.class);

    private String extPipeTypeName;
    // ParamKey => ParamValue
    Map<String, String> sinkParams;
    private PipeOpManager pipeOpManager;
    private ExtPipePluginManager extPipePluginManager;

    private IExternalPipeSinkWriterFactory pipeSinkWriterFactory;
    private ExtPipePluginConfiguration configuration;

    private volatile boolean alive = false;
    // ArrayList: save DataTransmissionTasks' info
    private List<DataTransmissionTask> dataTransmissionTasks;
    private ExecutorService executorService;

    // SG => DataCommitIndex
    private Map<String, Long> dataCommitMap = new ConcurrentHashMap<>();
    // Writer method name => (exception message => count)
    private Map<String, Map<String, AtomicInteger>> writerInvocationFailures;
    private int timestampDivisor;

    public ExtPipePlugin(
            String extPipeTypeName,
            Map<String, String> sinkParams,
            ExtPipePluginManager extPipePluginManager,
            PipeOpManager pipeOpManager) {
        this.extPipeTypeName = extPipeTypeName;
        this.sinkParams = sinkParams;
        this.pipeOpManager = pipeOpManager;
        this.extPipePluginManager = extPipePluginManager;

        String timePrecision = IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision();
        switch (timePrecision) {
            case "ms":
                timestampDivisor = 1;
                break;
            case "us":
                timestampDivisor = 1_000;
                break;
            case "ns":
                timestampDivisor = 1_000_000;
                break;
            default:
                throw new IllegalArgumentException("Unrecognized time precision: " + timePrecision);
        }
    }

    @TestOnly
    public ExtPipePlugin(
            String Name,
            IExternalPipeSinkWriterFactory factory,
            ExtPipePluginConfiguration conf,
            TsFilePipe tsFilePipe) {
    }

    @TestOnly
    public void setPipeSinkWriterFactory(IExternalPipeSinkWriterFactory pipeSinkWriterFactory) {
        this.pipeSinkWriterFactory = pipeSinkWriterFactory;
    }

    /**
     * Get the parameter value from sinkParams that is from CMD
     *
     * @param paramName
     * @param defaultValue
     * @return
     */
    private int getIntParam(String paramName, int defaultValue) {
        String valueStr = sinkParams.get(paramName);
        if (valueStr == null) {
            return defaultValue;
        }
        return Integer.parseInt(valueStr);
    }

    /**
     * start and init all working threads
     *
     * @throws IOException
     */
    public void start() throws IOException {
        logger.debug("ExtPipePlugin start(), extPipeName={}.", extPipeTypeName);

        if (alive) {
            String errMsg = "Can not re-run alive External pipe: " + extPipeTypeName + ".";
            logger.error(errMsg);
            throw new IllegalStateException(errMsg);
        }

        int threadNum = getIntParam("thread_num", ExtPipePluginConfiguration.DEFAULT_NUM_OF_THREADS);
        int batchSize =
                getIntParam("batch_size", ExtPipePluginConfiguration.DEFAULT_OPERATION_BATCH_SIZE);
        int attemptTimes =
                getIntParam("attempt_times", ExtPipePluginConfiguration.DEFAULT_ATTEMPT_TIMES);
        int backoffInterval =
                getIntParam("retry_interval", ExtPipePluginConfiguration.DEFAULT_BACKOFF_INTERVAL);

        try {
            // == Pipe configuration
            configuration =
                    new ExtPipePluginConfiguration.Builder(extPipeTypeName)
                            .numOfThreads(threadNum)
                            .operationBatchSize(batchSize)
                            .attemptTimes(attemptTimes)
                            .backOffInterval(backoffInterval)
                            .build();

            if (pipeSinkWriterFactory == null) {
                pipeSinkWriterFactory =
                        ExtPipePluginRegister.getInstance().getWriteFactory(extPipeTypeName);
            }

            // Try to init externalPipeSinkWriterFactory, also check the PIPESink parameter
            pipeSinkWriterFactory.initialize(sinkParams);
        } catch (Exception e) {
            logger.error("Failed to start External Pipe: {}.", extPipeTypeName, e);
            throw new IOException(
                    "Failed to start External Pipe: " + extPipeTypeName + ". " + e.getMessage());
        }

        alive = true;
        logger.info("External pipe " + extPipeTypeName + " begin to START");

        // == Launch pipe worker threads
        executorService =
                IoTDBThreadPoolFactory.newFixedThreadPool(
                        threadNum, ThreadName.EXT_PIPE_PLUGIN_WORKER.getName() + "-" + extPipeTypeName);

        // == Start threads that will run external PiPeSink plugin
        dataTransmissionTasks = new ArrayList<>(threadNum);
        for (int i = 0; i < threadNum; i++) {
            IExternalPipeSinkWriter writer = pipeSinkWriterFactory.get();
            DataTransmissionTask dataTransmissionTask =
                    new DataTransmissionTask(writer, i, configuration);
            dataTransmissionTasks.add(dataTransmissionTask);
            executorService.submit(dataTransmissionTask);
        }

        writerInvocationFailures = new ConcurrentHashMap<>();

        logger.info("External pipe " + extPipeTypeName + " finish START.");
    }

    /**
     * Stop all working threads
     */
    public void stop() {
        if (!alive) {
            String errMsg = "Error: External pipe " + extPipeTypeName + " has not started.";
            logger.error(errMsg);
            throw new IllegalStateException(errMsg);
        }
        alive = false;

        executorService.shutdown();
        boolean isExecutorServiceTerminated = false;
        try {
            isExecutorServiceTerminated = executorService.awaitTermination(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            logger.error(
                    "Interrupted when waiting for the termination of external pipe, " + extPipeTypeName, e);
        } finally {
            if (!isExecutorServiceTerminated) {
                logger.warn(
                        "ExtPipePlugin stop(), graceful termination of external pipe {} timed out. So force terminating working threads.",
                        extPipeTypeName);
                executorService.shutdownNow();
            }
        }
    }

    public boolean isAlive() {
        return alive;
    }

    /**
     * Collect the status of External Pipe.
     *
     * @return
     */
    public ExternalPipeStatus getStatus() {
        ExternalPipeStatus status = new ExternalPipeStatus();

        try {
            List<ExternalPipeSinkWriterStatus> writerStatuses =
                    dataTransmissionTasks.stream()
                            .map(DataTransmissionTask::getStatus)
                            .collect(Collectors.toList());
            status.setWriterStatuses(writerStatuses);
        } catch (Exception e) {
            handleExceptionsThrownByWriter("getStatus", e);
        }

        status.setAlive(alive);
        status.setWriterInvocationFailures(writerInvocationFailures);
        return status;
    }

    private void handleExceptionsThrownByWriter(String method, Exception e) {
        writerInvocationFailures.computeIfAbsent(method, m -> new ConcurrentHashMap<>());
        String eMsg = e.getMessage();
        if (eMsg == null) {
            eMsg = "N/A";
        }
        writerInvocationFailures.get(method).computeIfAbsent(eMsg, msg -> new AtomicInteger(0));
        writerInvocationFailures.get(method).get(eMsg).incrementAndGet();
        logger.info("Exception thrown from writer", e);
    }

    /**
     * Get the data committed index of dedicated StorageGroup
     *
     * @param sgName
     * @return
     */
    public long getDataCommitIndex(String sgName) {
        return dataCommitMap.getOrDefault(sgName, Long.MIN_VALUE);
    }

    private int getThreadIndex(String sgName) {
        return (abs(sgName.hashCode()) % configuration.getNumOfThreads());
    }

    /**
     * Notify every working thread task that new data arrive
     *
     * @param sgName
     * @param newDataBeginIndex
     * @param newDataCount
     */
    public void notifyNewDataArrive(String sgName, long newDataBeginIndex, long newDataCount) {
        logger.debug(
                "notifyNewDataArrive(), sgName={}, newDataBeginIndex={}, newDataCount={}",
                sgName,
                newDataBeginIndex,
                newDataCount);

        DataTransmissionTask dataTransmissionTask = dataTransmissionTasks.get(getThreadIndex(sgName));
        if (dataTransmissionTask != null) {
            dataTransmissionTask.notifyNewDataArrive(sgName, newDataBeginIndex, newDataCount);
        }
    }

    /**
     * working thread class Every thread take 1 IExternalPipeSinkWriter and is responsible for several
     * StorageGroup's data
     */
    private class DataTransmissionTask implements Callable<Void> {

        private final IExternalPipeSinkWriter writer;
        private final int threadIndex;
        private final ExtPipePluginConfiguration configuration;

        // SG ==> PipeStorageGroupInfo
        private Map<String, PipeStorageGroupInfo> sgInfoMap;

        private long nextIndex;
        private String lastReadSgName;

        // condition-lock for notifying new data arriving
        private byte[] newDataLocker = new byte[0];
        private long newDataCounter = 0L;

        DataTransmissionTask(
                IExternalPipeSinkWriter writer, int threadIndex, ExtPipePluginConfiguration configuration)
                throws IOException {
            this.writer = Validate.notNull(writer);
            this.threadIndex = threadIndex;
            this.configuration = configuration;

            this.sgInfoMap = configuration.getBucketSgInfoMap(threadIndex);

            this.writer.open();
        }

        /**
         * Get 1 SG's next data index that will be read according to past reading record. If there is no
         * SG in local record, get the first valid data index in PIPE data source.
         *
         * @param sgName
         * @return
         */
        private long getSgNextDataIndex(String sgName) {
            PipeStorageGroupInfo sgInfo = sgInfoMap.get(sgName);
            if (sgInfo != null) {
                return sgInfo.getNextReadIndex();
            }

            long nextReadIndex = pipeOpManager.getCommittedIndex(sgName) + 1;
            sgInfoMap.put(sgName, new PipeStorageGroupInfo(sgName, nextReadIndex - 1, nextReadIndex));

            return nextReadIndex;
        }

        /**
         * save the data index info to local record.
         *
         * @param sgName
         * @param nextReadIndex
         * @param committedIndex
         */
        private void setSgNextDataIndex(String sgName, long nextReadIndex, long committedIndex) {
            logger.debug(
                    "setSgNextDataIndex(), sgName={}, nextReadIndex={}, committedIndex={}.",
                    sgName,
                    nextReadIndex,
                    committedIndex);

            PipeStorageGroupInfo sgInfo =
                    sgInfoMap.computeIfAbsent(sgName, k -> new PipeStorageGroupInfo(sgName, -1, 0));

            sgInfo.setNextReadIndex(nextReadIndex);
            sgInfo.setCommittedIndex(committedIndex);
        }

        /**
         * commit data to let Pipe data source may remove useless data.
         *
         * @param sgName
         * @param committedIndex
         */
        private void commitData(String sgName, long committedIndex) throws IOException {
            logger.debug("commitData(), sgName={}, committedIndex={}.", sgName, committedIndex);

            dataCommitMap.put(sgName, committedIndex);

            if (pipeOpManager.opBlockNeedCommit(sgName, committedIndex)) {
                extPipePluginManager.triggerCommit(sgName, committedIndex);
            }
        }

        /**
         * check whether PIPE data source has new data for dedicate StorageGroup
         *
         * @param sgName
         * @return
         */
        private boolean sgHasNewData(String sgName) {
            PipeStorageGroupInfo pipeStorageGroupInfo = sgInfoMap.get(sgName);
            if (pipeStorageGroupInfo == null) {
                return true;
            }

            long nextReadIndex = pipeStorageGroupInfo.getNextReadIndex();
            long nextIndex = pipeOpManager.getNextIndex(sgName);
            if (nextIndex > nextReadIndex) {
                return true;
            }

            return false;
        }

        /**
         * Notify current working thread that new data arrive.
         *
         * @param sgName
         * @param newDataBeginIndex
         * @param newDataCount
         */
        public void notifyNewDataArrive(String sgName, long newDataBeginIndex, long newDataCount) {
            synchronized (newDataLocker) {
                newDataCounter++;
                newDataLocker.notifyAll();
            }
        }

        /**
         * Check/Wait new data/operation
         *
         * @return StorageGroup Name that has new data
         * @throws InterruptedException
         */
        public String waitForOperations() throws InterruptedException {
            if (lastReadSgName != null) {
                if (sgHasNewData(lastReadSgName)) {
                    return lastReadSgName;
                }
                lastReadSgName = null;
            }

            // find new data for the StorageGroup in sgSet
            while (alive) {
                Set<String> sgSet = pipeOpManager.getSgSet();
                Iterator<String> iter = sgSet.iterator();
                while (iter.hasNext()) {
                    String sgName = iter.next();
                    if (threadIndex != getThreadIndex(sgName)) {
                        continue;
                    }

                    if (sgHasNewData(sgName)) {
                        lastReadSgName = sgName;
                        return sgName;
                    }
                }

                synchronized (newDataLocker) {
                    if (newDataCounter <= 0L) {
                        try {
                            newDataLocker.wait(15000); // maximum time 15 seconds
                        } catch (InterruptedException ignored) {
                        }
                    }
                    newDataCounter = 0L;
                }
            }

            return null;
        }

        /**
         * Working thread Entrance. Read data from pipe and send data to external sink.
         *
         * @return
         * @throws Exception
         */
        @Override
        public Void call() throws Exception {
            logger.info("ExternalPipeWorker start. thread={}.", Thread.currentThread().getName());

            while (alive) {
                try {
                    // String sgName = pipeOpManager.waitForOperations(threadIndex, sgInfoMap);
                    String sgName = waitForOperations();
                    if (sgName == null) {
                        continue;
                    }

                    Operation operation = null;
                    try {
                        operation =
                                pipeOpManager.getOperation(
                                        sgName, getSgNextDataIndex(sgName), configuration.getOperationBatchSize());
                    } catch (IOException e) {
                        continue;
                    }

                    if ((operation == null) || (operation.getDataCount() <= 0)) {
                        continue;
                    }

                    if (!handleOperationWithRetry(sgName, operation)) {
                        logger.error(
                                "Failed to handle operation after "
                                        + configuration.getAttemptTimes()
                                        + " attempts: "
                                        + operation);
                    }

                    if (!flushWithRetry()) {
                        logger.error(
                                "Failed to flush operations after "
                                        + configuration.getAttemptTimes()
                                        + " attempts: startIndex="
                                        + operation.getStartIndex()
                                        + ",endIndex="
                                        + operation.getEndIndex());
                    }
                    // TODO: Reminder, May cause data loss if the flush failed.

                    // Update the next index and commit to the pipe source.
                    nextIndex = operation.getEndIndex();
                    setSgNextDataIndex(sgName, nextIndex, nextIndex - 1);
                    commitData(sgName, nextIndex - 1);
                } catch (InterruptedException e) {
                    break;
                } catch (Exception e) {
                    logger.error("Unexpected system exception", e);
                }
            }

            try {
                writer.close();
            } catch (IOException e) {
                handleExceptionsThrownByWriter("close", e);
                logger.info("Exception happened when closing the writer", e);
            }

            logger.info("ExternalPipeWorker exits. Thread={}", Thread.currentThread().getName());
            return null;
        }

        public ExternalPipeSinkWriterStatus getStatus() {
            return writer.getStatus();
        }

        private boolean handleOperationWithRetry(String sgName, Operation operation) {
            boolean succeed = false;
            int attemptTimes = configuration.getAttemptTimes();
            while (alive && attemptTimes > 0) {
                // Retry
                try {
                    pushOperationToExtPipe(sgName, operation);
                    succeed = true;
                    break;
                } catch (Exception e) {
                    logger.error("When handle operation {}, Exception", operation.getOperationType(), e);
                    handleExceptionsThrownByWriter(operation.getOperationTypeName(), e);
                }
                attemptTimes--;
                // Backoff
                try {
                    Thread.sleep(configuration.getBackOffInterval());
                } catch (InterruptedException ignored) {
                }
            }
            return succeed;
        }

        private void pushOperationToExtPipe(String sgName, Operation operation)
                throws IOException, IllegalArgumentException {
            if (operation instanceof InsertOperation) {
                handleInsertOperation(sgName, (InsertOperation) operation);
                return;
            }

            if (operation instanceof DeleteOperation) {
                handleDeleteOperation(sgName, (DeleteOperation) operation);
                return;
            }

            logger.error("pushOperationToExtPipe(), Unrecognized Operation: {}", operation);
            throw new IllegalArgumentException(
                    "pushOperationToExtPipe(), Unrecognized Operation:" + operation);
        }

        private void handleInsertOperation(String sgName, InsertOperation operation)
                throws IOException, IllegalArgumentException {
            for (Pair<MeasurementPath, List<TimeValuePair>> dataPair : operation.getDataList()) {
                MeasurementPath measurementPath = dataPair.left;
                for (TimeValuePair tvPair : dataPair.right) {
                    if (tvPair == null) {
                        continue;
                    }
                    String[] path = measurementPath.getNodes();
                    long timestampInMs = tvPair.getTimestamp() / timestampDivisor;
                    switch (tvPair.getValue().getDataType()) {
                        case BOOLEAN:
                            writer.insertBoolean(sgName, path, timestampInMs, tvPair.getValue().getBoolean());
                            break;
                        case INT32:
                            writer.insertInt32(sgName, path, timestampInMs, tvPair.getValue().getInt());
                            break;
                        case INT64:
                            writer.insertInt64(sgName, path, timestampInMs, tvPair.getValue().getLong());
                            break;
                        case FLOAT:
                            writer.insertFloat(sgName, path, timestampInMs, tvPair.getValue().getFloat());
                            break;
                        case DOUBLE:
                            writer.insertDouble(sgName, path, timestampInMs, tvPair.getValue().getDouble());
                            break;
                        case TEXT:
                            writer.insertText(sgName, path, timestampInMs, tvPair.getValue().getStringValue());
                            break;
                        //            case VECTOR:
                        //              writer.insertVector(
                        //                  sgName,
                        //                  path,
                        //                  Arrays.stream(tvPair.getValue().getVector())
                        //                      .map(TsPrimitiveType::getDataType)
                        //                      .map(type -> DataType.fromTsDataType(type.serialize()))
                        //                      .toArray(DataType[]::new),
                        //                  timestampInMs,
                        //                  Arrays.stream(tvPair.getValue().getVector())
                        //                      .map(TsPrimitiveType::getValue)
                        //                      .toArray(Object[]::new));
                        //              break;
                        default:
                            throw new IllegalArgumentException(
                                    "Unrecognized data type " + tvPair.getValue().getDataType());
                    }
                }
            }
        }

        private void handleDeleteOperation(String sgName, DeleteOperation deleteOperation)
                throws IOException {
            writer.delete(
                    sgName,
                    deleteOperation.getDeletePathStr(),
                    deleteOperation.getStartTime() / timestampDivisor,
                    deleteOperation.getEndTime() / timestampDivisor);
        }

        private boolean flushWithRetry() {
            try {
                writer.flush();
            } catch (IOException e1) {
                logger.error("Exception happened when flushing operations", e1);
                handleExceptionsThrownByWriter("flush", e1);
                boolean succeed = false;
                int attemptTimes = 1;
                while (alive && attemptTimes < configuration.getAttemptTimes()) {
                    // Backoff
                    try {
                        Thread.sleep(configuration.getBackOffInterval());
                    } catch (InterruptedException ignored) {
                    }
                    // Retry
                    try {
                        writer.flush();
                        succeed = true;
                        break;
                    } catch (Exception e2) {
                        handleExceptionsThrownByWriter("flush", e2);
                    }
                    attemptTimes += 1;
                }
                return succeed;
            }
            return true;
        }
    }
}
